package org.example.common

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.serializer.KryoSerializer
import org.example.constant.ApolloConst
import org.example.listener.LifecycleListener

/**
 * Spark配置类
 */
trait Sparking {

  // 屏蔽不必要的日志 ,在终端上显示需要的日志
  Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
  Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.WARN)
  Logger.getLogger("org.apache.kafka.clients.consumer").setLevel(Level.WARN)

  val conf: SparkConf = new SparkConf()
    .set("spark.serializer", classOf[KryoSerializer].getName) //序列化格式
    //.set("spark.kryo.registrationRequired", "true")
    .set("spark.streaming.stopGracefullyOnShutdown", "true") //优雅关闭的钩子
    .set("spark.extraListeners", classOf[LifecycleListener].getName) //监听器
    .set("hive.exec.dynamici.partition", "true")
    .set("hive.exec.dynamic.partition.mode", "nonstrict") //非严格模式
    .set("hive.exec.max.dynamic.partitions", "100000")
    .set("hive.exec.max.dynamic.partitions.pernode", "100000")
    //.set("spark.speculation", "true") //推测执行
    .set("spark.speculation.interval", "1000") //推断执行触发时间
    .set("spark.speculation.quantile", "0.75") //推断执行触发百分比
    .set("spark.speculation.multiplier", "1.5") //比其他的慢多少倍时启动推测
    .set("spark.sql.broadcastTimeout", "100000")
    .set("spark.io.compression.codec", "snappy")
    .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .set("spark.sql.crossJoin.enabled", "true")
    .set("es.nodes", ApolloConst.esNodes)
    .set("es.port", ApolloConst.esPort)
    .set("es.mapping.date.rich","false")
    .set("es.nodes.wan.only","true")
    .set("spark.yarn.jars","hdfs://hadoop/yyc3.2/jars/*")
//    .setMaster("local[*]")
//    .setAppName("test")
//    .set("spark.streaming.kafka.consumer.poll.ms","1000")
    //.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  //conf.set("spark.serializer", classOf[JavaSerializer].getName) //序列化格式
  //conf.set("spark.serializer", "org.apache.spark.serializer.JavaSerializer")
  //conf.registerKryoClasses(Array(classOf[RedisSink]))

  def getKafkaParams(servers: String, groupId: String): Map[String, Object] = {
    Map[String, Object](
      "bootstrap.servers" -> servers,
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> groupId,
      "enable.auto.commit" -> (false: java.lang.Boolean),
      "reconnect.backoff.ms" -> "0")
  }

}
